Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1757] Add retry when sending RPC to LifecycleManager #3008

Open
wants to merge 24 commits into
base: main
Choose a base branch
from

Conversation

zaynt4606
Copy link
Contributor

@zaynt4606 zaynt4606 commented Dec 19, 2024

What changes were proposed in this pull request?

Retry seding RPC to LifecycleManager when TimeoutException.

Why are the changes needed?

RPC messages are processed by Dispatcher.threadpool which its numThreads depends on numUsableCores.
In some cases (k8s) the numThreads of LifecycleManager are not enough while the RPCs are a lot so there are TimeoutExceptions.
Add retry when there are TimeoutExceptions.

Does this PR introduce any user-facing change?

No.

Another way is to adjust the configuration celeborn.lifecycleManager.rpc.dispatcher.threads to add the numThreads.
This way is more affective.

How was this patch tested?

Cluster testing.

@@ -29,7 +29,7 @@ license: |
| celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | |
| celeborn.&lt;module&gt;.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting <module> to `fetch`, it works for worker fetch server. | | |
| celeborn.&lt;module&gt;.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker. If setting <module> to `push`, it works for Flink shuffle client push data. | | |
| celeborn.&lt;module&gt;.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | |
| celeborn.&lt;module&gt;.io.mode | NIO | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @SteNicholas Seems the doc generation depends on the developer environment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can not pass the GA, need to revert it.

Copy link

codecov bot commented Dec 19, 2024

Codecov Report

Attention: Patch coverage is 35.55556% with 29 lines in your changes missing coverage. Please review.

Project coverage is 32.55%. Comparing base (4aabe37) to head (09a7cdb).
Report is 59 commits behind head on main.

Files with missing lines Patch % Lines
...rg/apache/celeborn/common/rpc/RpcEndpointRef.scala 6.25% 15 Missing ⚠️
.../scala/org/apache/celeborn/common/rpc/RpcEnv.scala 0.00% 13 Missing ⚠️
...cala/org/apache/celeborn/common/CelebornConf.scala 93.75% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3008      +/-   ##
==========================================
- Coverage   32.88%   32.55%   -0.33%     
==========================================
  Files         331      336       +5     
  Lines       19800    20102     +302     
  Branches     1780     1800      +20     
==========================================
+ Hits         6510     6542      +32     
- Misses      12929    13195     +266     
- Partials      361      365       +4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

initDataClientFactoryIfNeeded();
}

public <T> T callLifecycleManagerWithTimeoutRetry(Callable<T> callable, String name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of making changes everywhere - do we want to simply change askSync/askAsync to become retry aware ? With number of retries passed in as a param (for specific cases where we dont want retries for ex) ?

Copy link
Contributor Author

@zaynt4606 zaynt4606 Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree to change askSync/askAsync.
There are a lot of exception changes caused by that the setupLifecycleManagerRef will throws RpcTimeoutExceptions which we need to catch. I change the Exception type to RuntimeException

Copy link
Member

@turboFei turboFei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, I wonder that we can introduce two config items.

celeborn.rpc.retryWait for the default retry wait.

celeborn.client.rpc.retryWait for the client specific.

cc @pan3793

@@ -4884,6 +4885,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")

val RPC_TIMEOUT_RETRY_WAIT: ConfigEntry[Long] =
buildConf("celeborn.rpc.retryWait")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val RPC_RETRY_WAIT

And you can move this config to celeborn.rpc part.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder that you can introduce a new config celeborn.client.rpc.retryWait for client end.

@@ -30,6 +33,7 @@ abstract class RpcEndpointRef(conf: CelebornConf)
extends Serializable with Logging {

private[this] val defaultAskTimeout = conf.rpcAskTimeout
private[celeborn] val waitTimeBound = conf.rpcTimeoutRetryWaitMs.toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[this] val defaultRetryWait

@@ -104,6 +106,7 @@ object RpcEnv {
abstract class RpcEnv(config: RpcEnvConfig) {

private[celeborn] val defaultLookupTimeout = config.conf.rpcLookupTimeout
private[celeborn] val waitTimeBound = config.conf.rpcTimeoutRetryWaitMs.toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[celeborn] val defaultRetryWait

* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout, retryCount: Int): T = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def askSync[T: ClassTag](message: Any, timeout: RpcTimeout, retryCount: Int, retryWait: Long)

def setupEndpointRef(
address: RpcAddress,
endpointName: String,
retryCount: Int): RpcEndpointRef = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def setupEndpointRef(
      address: RpcAddress,
      endpointName: String,
      retryCount: Int,
      retryWait: Long)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sory for delay.
This pr has been updated with two configraions CLIENT_RPC_RETRY_WAIT and RPC_RETRY_WAIT.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants